/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#define USING_LOG_PREFIX SHARE_SCHEMA
#include "share/schema/ob_part_mgr_util.h"

#include "lib/container/ob_iarray.h"
#include "lib/container/ob_array_iterator.h"
#include "share/schema/ob_schema_getter_guard.h"
#include "share/schema/ob_schema_struct.h"
#include "share/schema/ob_table_schema.h"
#include "share/schema/ob_schema_mgr.h"
#include "share/ob_cluster_version.h"

#include "lib/oblog/ob_log.h"

namespace oceanbase {
using namespace common;

namespace share {
namespace schema {

/*
 * Currently this interface is only used when creating partitions. Consider the following scenarios:
 * 1. create/truncate table/tablegroup: In the resolver phase, fill part_array according to max_used_part_id
 *  (except for non-partitioned tables, including hash_like partitions)
 * 2. When building tables for bootstrap, create tenant partitions and system tables, part_array is empty,
 *  but part_id starts from 0 and is continuous (system tables do not perform partition management operations)
 */
int ObPartIdsGenerator::gen(ObIArray<int64_t>& part_ids)
{
  int ret = OB_SUCCESS;
  part_ids.reset();

  const ObPartitionLevel part_level = partition_schema_.get_part_level();
  if (PARTITION_LEVEL_ZERO == part_level) {
    // Partition management operations may push up the max_used_part_id of non-partitioned tables,
    // resulting in inconsistencies with the part_id generated by the schema.
    // Due to the limited partition management operations of non-partitioned tables, it can be assumed that the part_id
    // of non-partitioned tables is 0 for the time being, which is consistent with the rest of the code behavior.
    if (OB_FAIL(part_ids.push_back(0))) {
      LOG_WARN("fail to push_back part_id", K(ret));
    }
  } else {
    const int64_t part_num = partition_schema_.get_first_part_num();
    ObPartition** part_array = partition_schema_.get_part_array();
    int64_t max_used_part_id = partition_schema_.get_part_option().get_max_used_part_id();
    if (max_used_part_id <= 0) {
      max_used_part_id = partition_schema_.get_first_part_num() - 1;
    }
    for (int64_t i = 0; i < part_num && OB_SUCC(ret); ++i) {
      int64_t part_id = max_used_part_id - part_num + i + 1;
      if (OB_NOT_NULL(part_array) && OB_FAIL(GET_PART_ID(part_array, i, part_id))) {
        LOG_WARN("Failed to get part id", K(i), K(ret));
      } else if (part_id > max_used_part_id) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("part_id is invalid", K(ret), K(part_id), K(max_used_part_id));
      } else if (PARTITION_LEVEL_ONE == part_level) {
        int64_t phy_part_id = part_id;
        if (OB_FAIL(part_ids.push_back(phy_part_id))) {
          LOG_WARN("fail to push_back part_id", K(ret), K(phy_part_id));
        }
      } else if (PARTITION_LEVEL_TWO == part_level && partition_schema_.is_sub_part_template()) {
        int64_t sub_part_num = partition_schema_.get_sub_part_option().get_part_num();
        for (int64_t j = 0; j < sub_part_num && OB_SUCC(ret); ++j) {
          int64_t phy_part_id = generate_phy_part_id(part_id, j, part_level);
          if (OB_FAIL(part_ids.push_back(phy_part_id))) {
            LOG_WARN("fail to push_back part_id", K(ret), K(phy_part_id));
          }
        }
      } else {
        if (OB_ISNULL(part_array)) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("part_array is null", KR(ret));
        } else if (OB_ISNULL(part_array[i])) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("part_array[i] is null", KR(ret));
        } else {
          for (int64_t j = 0; j < part_array[i]->get_subpartition_num() && OB_SUCC(ret); ++j) {
            int64_t subpart_id = 0;
            int64_t max_used_sub_part_id = part_array[i]->get_max_used_sub_part_id();
            ObSubPartition** subpart_array = part_array[i]->get_subpart_array();
            if (OB_ISNULL(subpart_array) || OB_ISNULL(subpart_array[j])) {
              ret = OB_ERR_UNEXPECTED;
              LOG_WARN("sub part array is null", KR(ret), K(i));
            } else if (FALSE_IT(subpart_id = subpart_array[j]->get_sub_part_id())) {
            } else if (subpart_id > max_used_sub_part_id) {
              ret = OB_ERR_UNEXPECTED;
              LOG_WARN("subpart_id is invalid", K(ret), K(subpart_id), K(max_used_sub_part_id));
            } else {
              int64_t phy_part_id = generate_phy_part_id(part_id, subpart_id, part_level);
              if (OB_FAIL(part_ids.push_back(phy_part_id))) {
                LOG_WARN("fail to push_back part_id", K(ret), K(phy_part_id));
              }
            }
          }
        }
      }
    }
  }

  return ret;
}

int ObPartGetter::get_part_ids(const common::ObString& part_name, ObIArray<int64_t>& part_ids)
{
  int ret = OB_SUCCESS;
  const ObPartitionLevel part_level = table_.get_part_level();
  const ObPartition* part = NULL;
  ObString cmp_part_name;
  bool find = false;
  if (part_name.empty()) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invlaid part_name", K(ret), K(part_name));
  } else if (PARTITION_LEVEL_MAX == part_level) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("get unexpected part level", K(ret), K(part_level));
  } else {
    bool check_dropped_schema = false;
    ObPartIteratorV2 iter(table_, check_dropped_schema);
    while (OB_SUCC(ret) && !find && OB_SUCC(iter.next(part))) {
      if (OB_ISNULL(part)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("get null partition", K(ret));
      } else {
        cmp_part_name = part->get_part_name();
        LOG_DEBUG("cmp part name", K(cmp_part_name));
        if (ObCharset::case_insensitive_equal(part_name, cmp_part_name)) {
          // match level one part
          find = true;
          if (PARTITION_LEVEL_TWO == part_level) {
            if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_2230) {
              ret = OB_NOT_SUPPORTED;
              LOG_WARN("match level one part while table partition level is two with minimal cluster < 2230", K(ret));
            } else {
              ObSubPartIteratorV2 sub_iter(table_, *part, check_dropped_schema);
              const ObSubPartition* subpart = NULL;
              while (OB_SUCC(ret) && OB_SUCC(sub_iter.next(subpart))) {
                if (OB_ISNULL(subpart)) {
                  ret = OB_ERR_UNEXPECTED;
                  LOG_WARN("get null subpartition", K(ret));
                } else if (OB_FAIL(part_ids.push_back(
                               common::generate_phy_part_id(part->get_part_id(), subpart->get_sub_part_id())))) {
                  LOG_WARN("failed to push back subpart id", K(ret));
                }
              }
              if (OB_LIKELY(OB_ITER_END == ret)) {
                ret = OB_SUCCESS;
              }
            }
          } else if (OB_FAIL(part_ids.push_back(part->get_part_id()))) {
            LOG_WARN("failed to push back part id", K(ret));
          }
        } else if (PARTITION_LEVEL_TWO == part_level &&
                   OB_FAIL(get_subpart_ids_in_partition(part_name, *part, part_ids, find))) {
          LOG_WARN("failed to get subpart ids in partition", K(ret));
        }
      }
    }
    if (!find && OB_ITER_END == ret) {
      ret = OB_UNKNOWN_PARTITION;
    }
  }
  return ret;
}

int ObPartGetter::get_subpart_ids(const common::ObString& part_name, ObIArray<int64_t>& part_ids)
{
  int ret = OB_SUCCESS;
  const ObPartitionLevel part_level = table_.get_part_level();
  const ObPartition* part = NULL;
  bool find = false;
  if (part_name.empty()) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invlaid part_name", K(ret), K(part_name));
  } else if (PARTITION_LEVEL_ZERO == part_level) {
    ret = OB_ERR_NOT_PARTITIONED;
    LOG_WARN("table is not partitioned", K(ret));
  } else if (PARTITION_LEVEL_ONE == part_level) {
    // Oracle uses subpartition() on the primary partition table to report Specified subpartition does not exist
    ret = OB_UNKNOWN_SUBPARTITION;
    LOG_WARN("subpartition no exists", K(ret));
  } else if (PARTITION_LEVEL_TWO == part_level) {
    bool check_dropped_schema = false;
    ObPartIteratorV2 iter(table_, check_dropped_schema);
    while (OB_SUCC(ret) && !find && OB_SUCC(iter.next(part))) {
      if (OB_ISNULL(part)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("get null partition", K(ret));
      } else if (OB_FAIL(get_subpart_ids_in_partition(part_name, *part, part_ids, find))) {
        LOG_WARN("failed to get subpart ids in partition", K(ret));
      }
    }
    if (!find && OB_ITER_END == ret) {
      ret = OB_UNKNOWN_SUBPARTITION;
    }
  } else {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("get unexpected part level", K(ret), K(part_level));
  }
  return ret;
}

int ObPartGetter::get_subpart_ids_in_partition(
    const common::ObString& part_name, const ObPartition& partition, ObIArray<int64_t>& part_ids, bool& find)
{
  int ret = OB_SUCCESS;
  find = false;
  bool check_dropped_partition = false;
  ObSubPartIteratorV2 sub_iter(table_, partition, check_dropped_partition);
  const ObSubPartition* subpart = NULL;
  ObString cmp_part_name;
  while (OB_SUCC(ret) && !find && OB_SUCC(sub_iter.next(subpart))) {
    if (OB_ISNULL(subpart)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("get null subpartition", K(ret));
    } else {
      cmp_part_name = subpart->get_part_name();
      LOG_DEBUG("cmp part name", K(cmp_part_name));
      if (ObCharset::case_insensitive_equal(part_name, cmp_part_name)) {
        if (OB_FAIL(part_ids.push_back(
                common::generate_phy_part_id(partition.get_part_id(), subpart->get_sub_part_id())))) {
          LOG_WARN("failed to push back subpart id", K(ret));
        } else {
          find = true;
        }
      }
    }
  }
  if (!find && OB_ITER_END == ret) {
    ret = OB_SUCCESS;
  }
  return ret;
}

int ObPartMgrUtils::check_partition_can_remove(const ObPartitionSchema& partition_schema, const int64_t partition_id,
    const bool check_dropped_partition, bool& can)
{
  int ret = OB_SUCCESS;
  can = true;
  if (0 == partition_schema.get_partition_schema_version()) {
    // No split, no need to judge the source of split
    bool exist = false;
    if (OB_FAIL(ObPartMgrUtils::check_part_exist(partition_schema, partition_id, check_dropped_partition, exist))) {
      LOG_WARN("fail to check part exist", K(ret), K(partition_id), K(check_dropped_partition));
    } else {
      can = !exist;
    }
  } else {
    ObPartitionKeyIter iter(partition_schema.get_table_id(), partition_schema, check_dropped_partition);
    ObPartitionKeyIter::Info info;
    while (can && OB_SUCC(ret) && OB_SUCC(iter.next_partition_info(info))) {
      if (info.partition_id_ == partition_id) {
        can = false;
      } else if (PARTITION_STATUS_ACTIVE != partition_schema.get_partition_status() &&
                 OB_NOT_NULL(info.source_part_ids_)) {
        if (has_exist_in_array(*(info.source_part_ids_), partition_id)) {
          can = false;
        }
      }
    }
    if (OB_ITER_END == ret) {
      ret = OB_SUCCESS;
    } else {
      LOG_WARN("fail to iter", K(ret));
    }
  }
  return ret;
}

int ObPartMgrUtils::check_part_exist(const ObPartitionSchema& partition_schema, const int64_t partition_id,
    const bool check_dropped_partition, bool& exist)
{
  int ret = OB_SUCCESS;
  exist = false;

  const ObPartitionLevel part_level = partition_schema.get_part_level();
  if (PARTITION_LEVEL_ZERO == part_level) {
    exist = 0 == partition_id;
  } else if (PARTITION_LEVEL_ONE == part_level) {
    int64_t part_index = 0;
    if (OB_FAIL(partition_schema.get_partition_index_by_id(partition_id, check_dropped_partition, part_index))) {
      if (OB_ENTRY_NOT_EXIST == ret) {
        ret = OB_SUCCESS;
        exist = false;
      } else {
        LOG_WARN("failed to get partition idex", K(ret), K(partition_id));
      }
    } else {
      exist = true;
    }
  } else if (PARTITION_LEVEL_TWO == part_level) {
    int part_id = extract_part_idx(partition_id);
    int subpart_id = extract_subpart_idx(partition_id);
    const ObPartition* partition = NULL;
    if (OB_FAIL(partition_schema.get_partition_by_part_id(part_id, check_dropped_partition, partition))) {
      LOG_WARN("failed to get partition index", K(ret), K(part_id), K(partition_id));
    } else if (OB_ISNULL(partition)) {
      exist = false;
    } else {
      int64_t subpart_num = partition->get_sub_part_num();
      if (partition_schema.is_sub_part_template()) {
        // Template secondary partition table, no subpartition objects for delayed deletion,
        // and partition management operations are not allowed
        exist = subpart_id < subpart_num;
      } else {
        ObSubPartIteratorV2 subpart_iter(partition_schema, *partition, check_dropped_partition);
        const ObSubPartition* subpart = NULL;
        while (!exist && OB_SUCC(ret) && OB_SUCC(subpart_iter.next(subpart))) {
          if (OB_ISNULL(subpart)) {
            ret = OB_ERR_UNEXPECTED;
            LOG_WARN("fail to iter subpart", K(ret), K(partition_schema), KPC(partition));
          } else if (subpart->get_sub_part_id() == subpart_id) {
            exist = true;
          }
        }
        if (OB_SUCC(ret)) {
          // skip
        } else if (OB_ITER_END == ret) {
          ret = OB_SUCCESS;
        } else {
          LOG_WARN("fail to iter", K(ret), K(partition_schema));
        }
      }
    }
  } else {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("unexpected part level", K(ret), K(part_level));
  }

  return ret;
}

/*
 * Table schema and tablegroup after 2.0 are derived classes of partition schema, so this interface only provides
 * the get function of table schema and tablegroup after 2.0
 * In order to ensure the security of the interface, the input parameter contains the schema_type,
 * and there will be a type check in the interface. If the schema id does not match the schema type,
 * this call will directly report an error
 */
int ObPartMgrUtils::get_partition_schema(share::schema::ObSchemaGetterGuard& schema_guard, const uint64_t schema_id,
    const ObSchemaType schema_type, const share::schema::ObPartitionSchema*& partition_schema)
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(OB_INVALID_ID == schema_id)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(schema_id));
  } else if (is_new_tablegroup_id(schema_id) && ObSchemaType::TABLEGROUP_SCHEMA == schema_type) {
    const share::schema::ObTablegroupSchema* tg_schema = nullptr;
    if (OB_FAIL(schema_guard.get_tablegroup_schema(schema_id, tg_schema))) {
      LOG_WARN("fail to get tablegroup schema", K(ret), "tg_id", schema_id);
    } else if (OB_UNLIKELY(nullptr == tg_schema)) {
      // Designed for lost_replica_checker, where the schema is allowed to be empty, used to distinguish;
      // the following table_schema is the same.
      // Except for lost_replica_checker, currently using this interface does not make judgments based on the error code
      ret = OB_TABLEGROUP_NOT_EXIST;
      LOG_WARN("tg schema ptr is null", K(ret), "tg_id", schema_id);
    } else {
      partition_schema = tg_schema;
    }
  } else if (!is_new_tablegroup_id(schema_id) && ObSchemaType::TABLE_SCHEMA == schema_type) {
    const share::schema::ObSimpleTableSchemaV2* table_schema = nullptr;
    if (OB_FAIL(schema_guard.get_table_schema(schema_id, table_schema))) {
      LOG_WARN("fail to get table schema", K(ret), "table_id", schema_id);
    } else if (OB_UNLIKELY(nullptr == table_schema)) {
      ret = OB_TABLE_NOT_EXIST;
      LOG_WARN("table schema ptr is null", K(ret), "table_id", schema_id);
    } else {
      partition_schema = table_schema;
    }
  } else {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("schema type and schema id not match", K(ret), K(schema_type), K(schema_id));
  }
  return ret;
}

int ObPartMgrUtils::get_partition_entity_schemas_in_tenant(share::schema::ObSchemaGetterGuard& schema_guard,
    const uint64_t tenant_id, common::ObIArray<const share::schema::ObPartitionSchema*>& partition_schemas)
{
  int ret = OB_SUCCESS;
  ObArray<const ObSimpleTableSchemaV2*> table_schemas;
  ObArray<const ObTablegroupSchema*> tablegroup_schemas;
  if (OB_UNLIKELY(OB_INVALID_ID == tenant_id)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(tenant_id));
  } else if (OB_FAIL(schema_guard.get_table_schemas_in_tenant(tenant_id, table_schemas))) {
    LOG_WARN("fail to get table schemas in tenant", K(ret), K(tenant_id));
  } else if (OB_FAIL(schema_guard.get_tablegroup_schemas_in_tenant(tenant_id, tablegroup_schemas))) {
    LOG_WARN("fail to get tablegroup schemas in tenant", K(ret), K(tenant_id));
  } else {
    for (int64_t i = 0; OB_SUCC(ret) && i < table_schemas.count(); ++i) {
      const ObSimpleTableSchemaV2* table_schema = table_schemas.at(i);
      if (OB_UNLIKELY(nullptr == table_schema)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("table schema ptr is null", K(ret), KP(table_schema));
      } else if (!table_schema->has_self_partition()) {
        // bypass, ignore the table has no self partition
      } else if (OB_FAIL(partition_schemas.push_back(table_schema))) {
        LOG_WARN("fail to push back", K(ret), KP(table_schema));
      }
    }
    for (int64_t i = 0; OB_SUCC(ret) && i < tablegroup_schemas.count(); ++i) {
      const ObTablegroupSchema* tg_schema = tablegroup_schemas.at(i);
      if (OB_UNLIKELY(nullptr == tg_schema)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("tg schema ptr is null", K(ret), KP(tg_schema));
      } else if (!tg_schema->has_self_partition()) {
        // bypass, ignore the tablegroup has no self partition
      } else if (OB_FAIL(partition_schemas.push_back(tg_schema))) {
        LOG_WARN("fail to push back", K(ret), KP(tg_schema));
      }
    }
  }
  return ret;
}

bool ObPartMgrUtils::exist_partition_id(ObArray<int64_t>& partition_ids, const int64_t& partition_id)
{
  bool exist = false;

  ObArray<int64_t>::iterator iter = partition_ids.end();
  iter = std::lower_bound(partition_ids.begin(), partition_ids.end(), partition_id);
  if (iter != partition_ids.end()) {
    exist = iter != NULL && *iter == partition_id;
  }

  return exist;
}

// return phy_partition_id
// store hash for partitionID of old table, loop part ID of new_table.
// If it can be found in the hash, delete it, if it can't find it, add a new part
// The remaining elements in hash are the deleted parts
int ObPartMgrUtils::get_part_diff(const ObPartitionSchema& old_table, const ObPartitionSchema& new_table,
    ObIArray<int64_t>& part_dropped, ObIArray<int64_t>& part_added)
{
  int ret = OB_SUCCESS;
  part_dropped.reset();
  part_added.reset();

  if (old_table.get_table_id() != new_table.get_table_id()) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("invlaid arg", K(ret), K(old_table.get_table_id()), K(new_table.get_table_id()));
  } else if (old_table.get_schema_version() == new_table.get_schema_version()) {
    // do-nothing
  } else {
    bool check_dropped_schema = false;
    ObPartitionKeyIter old_table_iter(old_table.get_table_id(), old_table, check_dropped_schema);
    ObPartitionKeyIter new_table_iter(new_table.get_table_id(), new_table, check_dropped_schema);
    int64_t partition_id = -1;
    common::hash::ObHashSet<int64_t> old_partition_ids;
    // If it is a non-partitioned table, at least one partition is required. Partition 0
    int64_t hash_count = min(1000, old_table.get_partition_num() + 1);
    ObArray<int64_t> new_partition_ids;
    if (OB_FAIL(old_partition_ids.create(hash_count))) {
      LOG_WARN("failed to create hash set", K(ret), "num", hash_count, K(old_table));
    }
    while (OB_SUCC(ret) && OB_SUCC(old_table_iter.next_partition_id_v2(partition_id))) {
      if (OB_FAIL(old_partition_ids.set_refactored(partition_id))) {
        LOG_WARN("failed to insert key into set", K(ret), K(partition_id), K(old_table));
      }
    }
    if (OB_ITER_END == ret) {
      ret = OB_SUCCESS;
    }
    while (OB_SUCC(ret) && OB_SUCC(new_table_iter.next_partition_id_v2(partition_id))) {
      ret = new_partition_ids.push_back(partition_id);
    }
    if (OB_ITER_END == ret) {
      ret = OB_SUCCESS;
    }
    if (OB_SUCC(ret)) {
      for (int64_t i = 0; OB_SUCC(ret) && i < new_partition_ids.count(); ++i) {
        partition_id = new_partition_ids.at(i);
        if (OB_HASH_EXIST == (ret = old_partition_ids.exist_refactored(partition_id))) {
          // If found, delete the element in old
          if (OB_FAIL(old_partition_ids.erase_refactored(partition_id))) {
            LOG_WARN("failed to erase refactored", K(ret), K(i), K(partition_id));
          }
        } else if (OB_HASH_NOT_EXIST == ret) {
          // If it does not exist, it is the newly added part
          if (OB_FAIL(part_added.push_back(partition_id))) {
            LOG_WARN("failed to push back", K(ret), K(i), K(partition_id));
          }
        } else {
          LOG_WARN("failed to find partition", K(ret), K(i), K(partition_id));
        }
      }  // end for find part_added
      if (OB_SUCC(ret) && 0 < old_partition_ids.size()) {
        // The remaining elements in old are all deleted parts
        common::hash::ObHashSet<int64_t>::iterator iter = old_partition_ids.begin();
        for (; iter != old_partition_ids.end() && OB_SUCC(ret); ++iter) {
          if (OB_FAIL(part_dropped.push_back(iter->first))) {
            LOG_WARN("failed to push back", K(ret), K(old_partition_ids));
          }
        }
      }
    }
  }

  return ret;
}

int ObPartIteratorV2::check_inner_stat()
{
  int ret = OB_SUCCESS;
  if (!is_inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not initt", K(ret));
  } else if (OB_ISNULL(partition_schema_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("NULL ptr", K(ret), K(partition_schema_));
  }
  return ret;
}

int ObPartIteratorV2::next(const ObPartition*& part)
{
  int ret = OB_SUCCESS;
  part = NULL;

  if (OB_FAIL(check_inner_stat())) {
    LOG_WARN("check inner stat error");
  } else {
    const ObPartitionLevel part_level = partition_schema_->get_part_level();
    // In the partition management operation, partition_num is the newly added partition,
    // and part_num is the partition after the split
    // When the two results are not equal, use the actual value partition_num to iterate,
    // and the split hash partition also has an array
    // The partition_array of the system table is empty and needs to be processed by part_num
    int64_t part_num = OB_INVALID_ID;
    int64_t total_part_num = OB_INVALID_ID;
    if (partition_schema_->is_in_splitting()) {
      part_num = partition_schema_->get_partition_num();
    } else {
      part_num = partition_schema_->get_part_option().get_part_num();
    }
    total_part_num = part_num + (check_dropped_schema_ ? partition_schema_->get_dropped_partition_num() : 0);
    part_.reset();
    if (idx_++ >= total_part_num - 1) {
      ret = OB_ITER_END;
    } else {
      int64_t actual_idx = idx_;
      ObPartition** part_array = partition_schema_->get_part_array();
      if (OB_SUCC(ret) && check_dropped_schema_ && idx_ >= part_num) {
        actual_idx = idx_ - part_num;
        part_array = partition_schema_->get_dropped_part_array();
      }
      if (PARTITION_LEVEL_ZERO == part_level) {
        int n = snprintf(buf_, BUF_LEN, "p%ld", actual_idx);
        if (n < 0 || n >= BUF_LEN) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("snprintf failed", K(n), LITERAL_K(BUF_LEN));
        } else if (0 != actual_idx) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("idx is invalid", K(ret), K(actual_idx));
        } else {
          ObString part_name(n, buf_);
          part_.set_part_name(part_name);
          part_.set_part_id(0);
          part = &part_;
        }
      } else if (PARTITION_LEVEL_ONE == part_level || PARTITION_LEVEL_TWO == part_level) {
        if (OB_ISNULL(part_array)) {
          int n = snprintf(buf_, BUF_LEN, "p%ld", actual_idx);
          if (n < 0 || n >= BUF_LEN) {
            ret = OB_ERR_UNEXPECTED;
            LOG_WARN("snprintf failed", K(n), LITERAL_K(BUF_LEN));
          } else {
            ObString part_name(n, buf_);
            part_.set_part_name(part_name);
            part_.set_part_id(actual_idx);
            part_.set_mapping_pg_part_id(actual_idx);
            part = &part_;
          }
        } else {
          part = part_array[actual_idx];
        }
      } else {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("unexpected part level", K(ret), K(part_level));
      }
    }
  }
  return ret;
}

int ObSubPartIteratorV2::check_inner_stat()
{
  int ret = OB_SUCCESS;
  if (!is_inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not init", K(ret));
  } else if (OB_ISNULL(partition_schema_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("NULL ptr", K(ret), K(partition_schema_), K(part_));
  }
  return ret;
}

int ObSubPartIteratorV2::next_for_template(const ObSubPartition*& subpart)
{
  int ret = OB_SUCCESS;
  const int64_t subpart_num = partition_schema_->get_sub_part_option().get_part_num();
  ObSubPartition** subpart_array = partition_schema_->get_def_subpart_array();
  subpart_.reset();
  if (idx_++ >= subpart_num - 1) {
    ret = OB_ITER_END;
  } else {
    int n = -1;
    if (partition_schema_->is_range_subpart() || partition_schema_->is_list_subpart()) {
      ret = subpart_.assign(*subpart_array[idx_]);
    } else {
      if (nullptr != subpart_array && nullptr != subpart_array[idx_]) {
        ret = subpart_.assign(*subpart_array[idx_]);
      } else {
        subpart_.set_sub_part_id(idx_);
        subpart_.set_mapping_pg_sub_part_id(idx_);
      }
    }
    if (OB_FAIL(ret)) {
      // failed
    } else if (nullptr == part_) {
      // do not touch part name
    } else if (partition_schema_->is_range_subpart() || partition_schema_->is_list_subpart()) {
      const ObString& part_name = part_->get_part_name();
      const ObString& subpart_name = subpart_.get_part_name();
      n = snprintf(
          buf_, BUF_LEN, "%.*ss%.*s", part_name.length(), part_name.ptr(), subpart_name.length(), subpart_name.ptr());
    } else {
      const ObString& part_name = part_->get_part_name();
      n = snprintf(buf_, BUF_LEN, "%.*ssp%ld", part_name.length(), part_name.ptr(), subpart_.get_sub_part_id());
    }
    if (OB_SUCC(ret)) {
      if (nullptr == part_) {
        // do not touch part name, remain unchanged
      } else if (n < 0 || n >= BUF_LEN) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("snprintf failed", K(n), LITERAL_K(BUF_LEN));
      } else {
        ObString subpart_name(n, buf_);
        subpart_.set_part_name(subpart_name);
      }
    }
    if (OB_SUCC(ret)) {
      subpart = &subpart_;
    }
  }
  return ret;
}

int ObSubPartIteratorV2::next_for_nontemplate(const ObSubPartition*& subpart)
{
  int ret = OB_SUCCESS;
  if (OB_ISNULL(part_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("part_ is NULL nontemplate subpartition iterator", KR(ret));
  } else {
    const int64_t subpart_num = part_->get_subpartition_num();
    int64_t total_subpart_num = subpart_num;
    if (check_dropped_partition_) {
      total_subpart_num += part_->get_dropped_subpartition_num();
    }
    if (idx_++ >= total_subpart_num - 1) {
      ret = OB_ITER_END;
    } else {
      int64_t actual_idx = OB_INVALID_INDEX;
      ObSubPartition** subpart_array = NULL;
      if (idx_ >= subpart_num) {
        actual_idx = idx_ - subpart_num;
        subpart_array = part_->get_dropped_subpart_array();
      } else {
        actual_idx = idx_;
        subpart_array = part_->get_subpart_array();
      }
      if (OB_ISNULL(subpart_array)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("subpartition array in partition is NULL", KR(ret));
      } else if (OB_ISNULL(subpart_array[actual_idx])) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("subpartition in partition is NULL", K(total_subpart_num), K(subpart_num), K(actual_idx), KR(ret));
      } else {
        subpart = subpart_array[actual_idx];
      }
    }
  }
  return ret;
}

// part_ == NULL only ObAllVirtualProxySubPartition use, indicate loop subpartition name of template table
int ObSubPartIteratorV2::next(const ObSubPartition*& subpart)
{
  int ret = OB_SUCCESS;
  subpart = NULL;

  if (OB_FAIL(check_inner_stat())) {
    LOG_WARN("check inner stat error");
  } else if (partition_schema_->is_sub_part_template()) {
    ret = next_for_template(subpart);
  } else {
    ret = next_for_nontemplate(subpart);
  }
  return ret;
}

int ObDroppedPartIterator::check_inner_stat()
{
  int ret = OB_SUCCESS;
  if (!is_inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not initt", K(ret));
  } else if (OB_ISNULL(partition_schema_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("NULL ptr", K(ret), K(partition_schema_));
  }
  return ret;
}

int ObDroppedPartIterator::next(const ObPartition*& part)
{
  int ret = OB_SUCCESS;
  part = NULL;
  if (OB_FAIL(check_inner_stat())) {
    LOG_WARN("check inner stat error");
  } else if (idx_ >= partition_schema_->get_dropped_partition_num()) {
    ret = OB_ITER_END;
  } else {
    part = partition_schema_->get_dropped_part_array()[idx_++];
  }
  return ret;
}

ObPartitionKeyIter::ObPartitionKeyIter(
    const uint64_t schema_id, const ObPartitionSchema& partition_schema, bool check_dropped_schema)
    : schema_id_(schema_id),
      partition_schema_(&partition_schema),
      part_(NULL),
      check_dropped_schema_(check_dropped_schema)
{
  part_level_ = partition_schema.get_part_level();
}

int64_t ObPartitionKeyIter::get_partition_cnt() const
{
  int64_t partition_cnt = 0;
  if (partition_schema_ != NULL) {
    partition_cnt = partition_schema_->get_partition_cnt();
  }
  return partition_cnt;
}

// FIXME: Change the iterative method of pkey without relying on this interface
int64_t ObPartitionKeyIter::get_partition_num() const
{
  int ret = OB_SUCCESS;
  int64_t partition_num = 0;
  if (OB_NOT_NULL(partition_schema_) &&
      OB_FAIL(partition_schema_->get_all_partition_num(check_dropped_schema_, partition_num))) {
    LOG_WARN("fail to get all partition num", K(ret), KPC_(partition_schema));
  }
  partition_num = OB_FAIL(ret) ? -1 : partition_num;
  return partition_num;
}

int ObPartitionKeyIter::next_partition_id_v2(int64_t& partition_id)
{
  int ret = OB_SUCCESS;
  ObPartitionKeyIter::Info info;
  if (OB_FAIL(next_partition_info(info))) {
    if (OB_ITER_END != ret) {
      LOG_WARN("fail to get next partition info", K(ret));
    }
  } else {
    partition_id = info.partition_id_;
  }
  return ret;
}

int ObPartitionKeyIter::next_partition_info(ObPartitionKeyIter::Info& info)
{
  int ret = OB_SUCCESS;

  if (NULL == part_) {
    part_iter_.init(*partition_schema_, check_dropped_schema_);
  }
  if (NULL == part_ || PARTITION_LEVEL_TWO != part_level_) {
    if (OB_FAIL(part_iter_.next(part_))) {
      if (ret != OB_ITER_END) {
        LOG_WARN("get next partition failed", K(ret));
      }
    } else if (OB_ISNULL(part_)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("NULL ptr", K(ret), K(part_));
    } else if (PARTITION_LEVEL_TWO == part_level_) {
      subpart_iter_.init(*partition_schema_, *part_, check_dropped_schema_);
    }
  }
  if (OB_SUCC(ret)) {
    info.partition_id_ = part_->get_part_id();
    info.drop_schema_version_ = part_->get_drop_schema_version();
    info.tg_partition_id_ = PARTITION_LEVEL_ZERO == part_level_ ? 0 : part_->get_mapping_pg_part_id();
    info.source_part_ids_ = &(part_->get_source_part_ids());
    if (PARTITION_LEVEL_TWO == part_level_) {
      const ObSubPartition* subpart = NULL;
      if (OB_FAIL(subpart_iter_.next(subpart))) {
        if (ret != OB_ITER_END) {
          LOG_WARN("get next subpart failed", K(ret));
        } else if (OB_FAIL(part_iter_.next(part_))) {
          if (ret != OB_ITER_END) {
            LOG_WARN("get next part failed", K(ret));
          }
        } else if (OB_ISNULL(part_)) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("NULL ptr", K(ret), K(part_));
        } else if (FALSE_IT(subpart_iter_.init(*partition_schema_, *part_, check_dropped_schema_))) {
          // will never be here
        } else if (OB_FAIL(subpart_iter_.next(subpart))) {
          if (ret != OB_ITER_END) {
            LOG_WARN("get next subpart failed", K(ret));
          }
        }
      }
      if (OB_SUCC(ret)) {
        if (OB_ISNULL(subpart)) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("NULL ptr", K(ret), K(subpart));
        } else {
          info.partition_id_ =
              generate_phy_part_id(part_->get_part_id(), subpart->get_sub_part_id(), PARTITION_LEVEL_TWO);

          // Non-templated secondary partition, subpart is the physical_partition_id of pg
          if (!partition_schema_->is_sub_part_template()) {
            info.tg_partition_id_ = subpart->get_mapping_pg_sub_part_id();
          } else {
            info.tg_partition_id_ = generate_phy_part_id(
                part_->get_mapping_pg_part_id(), subpart->get_mapping_pg_sub_part_id(), PARTITION_LEVEL_TWO);
          }

          info.source_part_ids_ = &(subpart->get_source_part_ids());

          // Subject to the actual record of the secondary partition
          // 1. The templated second-level partition table only has first-level partition additions and deletions,
          //  and the template second-level partition information has no drop_schema_version, so the first-level
          //  partition shall prevail;
          // 2. The non-templated secondary partition table is subject to the secondary partition
          //  (the addition and deletion of the primary partition will write the primary partition
          //  and the drop_schema_version corresponding to the secondary partition at the same time)
          if (subpart->get_drop_schema_version() > 0) {
            info.drop_schema_version_ = subpart->get_drop_schema_version();
          }
        }
      }
    }
  }

  return ret;
}

int ObPartitionKeyIter::next_partition_key_v2(common::ObPartitionKey& pkey)
{
  int ret = OB_SUCCESS;
  int64_t partition_id = 0;
  if (OB_FAIL(next_partition_id_v2(partition_id))) {
    if (OB_ITER_END != ret) {
      LOG_WARN("Get next partition id failed", K(ret));
    }
  } else if (OB_FAIL(pkey.init(get_schema_id(), partition_id, get_partition_cnt()))) {
    LOG_WARN("Failed to init partition key", K(ret));
  } else {
  }  // do nothing
  return ret;
}

ObDroppedPartitionKeyIter::ObDroppedPartitionKeyIter(
    const uint64_t schema_id, const ObPartitionSchema& partition_schema)
    : iter_(schema_id, partition_schema, true /*check_dropped_schema*/)
{}

int ObDroppedPartitionKeyIter::next_partition_id(int64_t& partition_id)
{
  int ret = OB_SUCCESS;
  ObPartitionKeyIter::Info info;
  if (OB_FAIL(next_partition_info(info))) {
    if (OB_ITER_END != ret) {
      LOG_WARN("fail to iter next partition info", K(ret));
    }
  } else {
    partition_id = info.partition_id_;
  }
  return ret;
}

int ObDroppedPartitionKeyIter::next_partition_info(ObPartitionKeyIter::Info& info)
{
  int ret = OB_SUCCESS;
  while (OB_SUCC(ret) && OB_SUCC(iter_.next_partition_info(info)) && info.drop_schema_version_ <= 0) {
    // do nothing
  }
  if (OB_ITER_END != ret) {
    LOG_WARN("fail to iter", K(ret));
  }
  return ret;
}

ObTablePartitionKeyIter::ObTablePartitionKeyIter(const ObSimpleTableSchemaV2& table_schema, bool check_dropped_schema)
    : partition_key_iter_(table_schema.get_table_id(), table_schema, check_dropped_schema)
{
  // ObPartitionKeyIter *partition_key_it = new ObPartitionKeyIter(table_schema.get_table_id(), table_schema);
  // partition_key_iter_ = *partition_key_it;
}

int ObTablePartitionKeyIter::next_partition_id_v2(int64_t& partition_id)
{
  int ret = OB_SUCCESS;
  if (OB_FAIL(partition_key_iter_.next_partition_id_v2(partition_id))) {
    if (OB_ITER_END != ret) {
      LOG_WARN("Get next partition id failed", K(ret));
    }
  }
  return ret;
}

int ObTablePartitionKeyIter::next_partition_key_v2(ObPartitionKey& pkey)
{
  int ret = OB_SUCCESS;
  int64_t partition_id = 0;
  if (OB_FAIL(next_partition_id_v2(partition_id))) {
    if (OB_ITER_END != ret) {
      LOG_WARN("Get next partition id failed", K(ret));
    }
  } else if (OB_FAIL(pkey.init(get_table_id(), partition_id, get_partition_cnt()))) {
    LOG_WARN("Failed to init partition key", K(ret));
  } else {
  }  // do nothing
  return ret;
}

ObTablegroupPartitionKeyIter::ObTablegroupPartitionKeyIter(
    const ObTablegroupSchema& tablegroup_schema, bool check_dropped_schema)
    : partition_key_iter_(tablegroup_schema.get_tablegroup_id(), tablegroup_schema, check_dropped_schema)
{}

int ObTablegroupPartitionKeyIter::next_partition_id_v2(int64_t& partition_id)
{
  int ret = OB_SUCCESS;
  if (OB_FAIL(partition_key_iter_.next_partition_id_v2(partition_id))) {
    if (OB_ITER_END != ret) {
      LOG_WARN("Get next partition id failed", K(ret));
    }
  }
  return ret;
}

int ObTablegroupPartitionKeyIter::next_partition_key_v2(common::ObPGKey& pkey)
{
  int ret = OB_SUCCESS;
  int64_t partition_id = 0;
  if (OB_FAIL(next_partition_id_v2(partition_id))) {
    if (OB_ITER_END != ret) {
      LOG_WARN("Get next partition id failed", K(ret));
    }
  } else if (OB_FAIL(pkey.init(get_tablegroup_id(), partition_id, get_partition_cnt()))) {
    LOG_WARN("Failed to init pg key", K(ret));
  }
  return ret;
}

int ObTablePgKeyIter::init()
{
  int ret = OB_SUCCESS;
  if (OB_INVALID_ID == table_schema_.get_tablegroup_id()) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("table not in any tablegroup", K(ret), "tg_id", table_schema_.get_tablegroup_id());
  } else if (table_schema_.get_tablegroup_id() != tablegroup_id_) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN(
        "tg_id not match", K(ret), "tg_id_in_table_schema", table_schema_.get_tablegroup_id(), "tg_id", tablegroup_id_);
  } else if (!table_schema_.get_binding()) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("not a binding tablegroup", K(ret), "table_id", table_schema_.get_table_id());
  } else {
  }  // check finish
  return ret;
}

int ObTablePgKeyIter::next(common::ObPartitionKey& pkey, common::ObPGKey& pgkey)
{
  int ret = OB_SUCCESS;
  ObPartitionKeyIter::Info info;
  if (OB_FAIL(iter_.next_partition_info(info))) {
    if (OB_ITER_END != ret) {
      LOG_WARN("fail to iter next partition info", K(ret));
    }
  } else if (OB_FAIL(pkey.init(table_schema_.get_table_id(), info.partition_id_, table_schema_.get_partition_cnt()))) {
    LOG_WARN("fail to init pkey", K(ret));
  } else if (OB_FAIL(pgkey.init(
                 table_schema_.get_tablegroup_id(), info.tg_partition_id_, 0 /* partition cnt of tablegroup is 0 */))) {
    LOG_WARN("fail to init pkey", K(ret));
  }
  return ret;
}

// FIXME: This interface exposes the offset of the partition in the array, which is very dangerous.
//    The dependency on this interface should be lifted in the future.
// @partition_id: phy_partition_id
// @partition_idx: the offset of partition in partition_array
int ObPartMgrUtils::get_partition_idx_by_id(const ObPartitionSchema& partition_schema,
    const bool check_dropped_partition, const int64_t partition_id, int64_t& partition_idx)
{
  int ret = OB_SUCCESS;
  partition_idx = OB_INVALID_INDEX;

  if (partition_id < 0) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid partition_id", K(ret), K(partition_id));
  } else {
    const ObPartitionLevel part_level = partition_schema.get_part_level();
    if (PARTITION_LEVEL_ZERO == part_level) {
      if (partition_id != 0) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("unexpected partition_id ", K(ret), K(partition_id));
      } else {
        partition_idx = 0;
      }
    } else if (PARTITION_LEVEL_ONE == part_level ||
               (PARTITION_LEVEL_TWO == part_level && partition_schema.is_sub_part_template())) {
      int64_t part_num = partition_schema.get_first_part_num() +
                         (check_dropped_partition ? partition_schema.get_dropped_partition_num() : 0);
      int64_t part_id_level_one = partition_id;
      int64_t part_id_level_two = 0;
      if (PARTITION_LEVEL_TWO == part_level) {
        part_id_level_one = extract_part_idx(partition_id);
        part_id_level_two = extract_subpart_idx(partition_id);
      }
      int64_t part_idx_level_one = part_id_level_one;
      int64_t part_idx_level_two = part_id_level_two;
      if (OB_NOT_NULL(partition_schema.get_part_array())) {
        if (OB_FAIL(partition_schema.get_partition_index_by_id(
                part_id_level_one, check_dropped_partition, part_idx_level_one))) {
          LOG_WARN("failed to get partition index", K(ret), K(part_id_level_one));
        }
      } else {
        // When building the table, an incomplete schema will be used to construct a balance container. For the time
        // being, it is compatible with scenarios where the partition array may be empty
      }
      if (OB_FAIL(ret)) {
      } else if (part_idx_level_one >= part_num) {
        ret = OB_ENTRY_NOT_EXIST;
        LOG_WARN("part_idx_level_one is invalid", K(ret), K(part_idx_level_one), K(part_num));
      } else if (PARTITION_LEVEL_ONE == part_level) {
        partition_idx = part_idx_level_one;
      } else if (PARTITION_LEVEL_TWO == part_level) {
        // Templated secondary partitions are not allowed to add or delete secondary partitions,
        // subpart_id is equal to subpart_idx
        int64_t subpart_num = partition_schema.get_def_sub_part_num();
        if (part_id_level_two >= subpart_num) {
          ret = OB_ENTRY_NOT_EXIST;
          LOG_WARN("part_id_level_two is invalid", K(ret), K(partition_id), K(subpart_num));
        } else {
          partition_idx = part_idx_level_one * subpart_num + part_idx_level_two;
        }
      }
    } else if (PARTITION_LEVEL_TWO == part_level && !partition_schema.is_sub_part_template()) {
      partition_idx = 0;
      int64_t part_id = extract_part_idx(partition_id);
      int64_t subpart_id = extract_subpart_idx(partition_id);
      bool finded = false;
      // iter part
      const ObPartition* part = NULL;
      ObPartIteratorV2 part_iter(partition_schema, check_dropped_partition);
      while (!finded && OB_SUCC(ret) && OB_SUCC(part_iter.next(part))) {
        if (OB_ISNULL(part)) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("part is null", K(ret));
        } else if (part->get_part_id() == part_id) {
          finded = true;
        } else {
          partition_idx += part->get_sub_part_num();
          if (check_dropped_partition) {
            partition_idx += part->get_dropped_subpartition_num();
          }
        }
      }
      if (OB_FAIL(ret)) {
        ret = OB_ITER_END == ret ? OB_ENTRY_NOT_EXIST : ret;
        LOG_WARN("iter failed", K(ret), K(partition_schema));
      } else if (!finded || OB_ISNULL(part) || part->get_part_id() != part_id) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("partition should be finded", K(ret), K(finded), KPC(part));
      }

      // iter subpart
      if (OB_SUCC(ret) && finded && OB_NOT_NULL(part)) {
        finded = false;
        const ObSubPartition* subpart = NULL;
        ObSubPartIteratorV2 subpart_iter(partition_schema, *part, check_dropped_partition);
        bool finded = false;
        while (!finded && OB_SUCC(ret) && OB_SUCC(subpart_iter.next(subpart))) {
          if (OB_ISNULL(subpart)) {
            ret = OB_ERR_UNEXPECTED;
            LOG_WARN("subpart is null", K(ret));
          } else if (subpart->get_sub_part_id() == subpart_id) {
            finded = true;
          } else {
            partition_idx++;
          }
        }
        if (OB_FAIL(ret)) {
          ret = OB_ITER_END == ret ? OB_ENTRY_NOT_EXIST : ret;
          LOG_WARN("iter failed", K(ret), K(partition_schema));
        } else if (!finded || OB_ISNULL(subpart) || subpart->get_sub_part_id() != subpart_id) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("subpartition should be finded", K(ret), K(finded), KPC(part));
        }
      }
    } else {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("invalid part level", K(ret), K(part_level));
    }
  }

  return ret;
}

ObTablePartItemIterator::ObTablePartItemIterator()
    : table_schema_(NULL), part_iter_(), subpart_iter_(), part_(NULL), partition_idx_(0), part_idx_(0), subpart_idx_(0)
{}

ObTablePartItemIterator::ObTablePartItemIterator(const ObSimpleTableSchemaV2& table_schema)
    : table_schema_(&table_schema),
      part_iter_(),
      subpart_iter_(),
      part_(NULL),
      partition_idx_(0),
      part_idx_(0),
      subpart_idx_(0)
{}

void ObTablePartItemIterator::init(const ObSimpleTableSchemaV2& table_schema)
{
  table_schema_ = &table_schema;
  part_ = NULL;
  partition_idx_ = 0;
  part_idx_ = 0;
  subpart_idx_ = 0;
}

int ObTablePartItemIterator::next(ObPartitionItem& item)
{
  int ret = OB_SUCCESS;
  item.reset();

  bool check_dropped_partition = false;
  if (OB_ISNULL(table_schema_)) {
    ret = OB_NOT_INIT;
    LOG_WARN("should init table_schema", K(ret));
  }
  if (OB_SUCC(ret) && OB_ISNULL(part_)) {
    part_iter_.init(*table_schema_, check_dropped_partition);
  }
  if (OB_FAIL(ret)) {
  } else if (OB_ISNULL(part_) || PARTITION_LEVEL_TWO != table_schema_->get_part_level()) {
    if (OB_FAIL(part_iter_.next(part_))) {
      if (ret != OB_ITER_END) {
        LOG_WARN("get next partition failed", K(ret));
      }
    } else if (OB_ISNULL(part_)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("partition is null", K(ret));
    } else if (FALSE_IT(subpart_idx_ = 0)) {
    } else if (PARTITION_LEVEL_TWO == table_schema_->get_part_level()) {
      subpart_iter_.init(*table_schema_, *part_, check_dropped_partition);
    }
  }
  if (OB_FAIL(ret)) {
  } else if (PARTITION_LEVEL_TWO != table_schema_->get_part_level()) {
    item.part_idx_ = part_idx_++;
    item.part_id_ = part_->get_part_id();
    item.part_num_ = table_schema_->get_first_part_num();
    item.part_name_ = &(part_->get_part_name());
    item.part_high_bound_ = &(part_->get_high_bound_val());

    item.partition_idx_ = partition_idx_++;
    item.partition_id_ = part_->get_part_id();
    item.partition_num_ = table_schema_->get_all_part_num();
  } else {
    const ObSubPartition* subpart = NULL;
    if (OB_FAIL(subpart_iter_.next(subpart))) {
      if (ret != OB_ITER_END) {
        LOG_WARN("get next subpart failed", K(ret));
      } else if (OB_FAIL(part_iter_.next(part_))) {
        if (ret != OB_ITER_END) {
          LOG_WARN("get next part failed", K(ret));
        }
      } else if (OB_ISNULL(part_)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("partition is null", K(ret));
      } else {
        part_idx_++;
        subpart_idx_ = 0;
        subpart_iter_.init(*table_schema_, *part_, check_dropped_partition);
        if (OB_FAIL(subpart_iter_.next(subpart))) {
          if (ret != OB_ITER_END) {
            LOG_WARN("get next subpart failed", K(ret));
          }
        }
      }
    }
    if (OB_FAIL(ret)) {
    } else if (OB_ISNULL(subpart)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("subpartition is null", K(ret));
    } else {
      item.part_idx_ = part_idx_;
      item.part_id_ = part_->get_part_id();
      item.part_num_ = table_schema_->get_first_part_num();
      item.part_name_ = &(part_->get_part_name());
      item.part_high_bound_ = &(part_->get_high_bound_val());

      item.subpart_idx_ = subpart_idx_++;
      item.subpart_id_ = subpart->get_sub_part_id();
      item.subpart_num_ = part_->get_sub_part_num();
      item.subpart_name_ = &(subpart->get_part_name());
      item.subpart_high_bound_ = &(subpart->get_high_bound_val());

      item.partition_idx_ = partition_idx_++;
      item.partition_id_ = common::generate_phy_part_id(part_->get_part_id(), subpart->get_sub_part_id());
      item.partition_num_ = table_schema_->get_all_part_num();
    }
  }
  return ret;
}

}  // namespace schema
}  // namespace share
}  // namespace oceanbase
